# -*- coding: utf-8 -*-
#
# Copyright 2015 Twitter Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""This is an integration test for the GCS-luigi binding.

This test requires credentials that can access GCS & access to a bucket below.
Follow the directions in the gcloud tools to set up local credentials.
"""

import googleapiclient.errors
import oauth2client
import os
import tempfile
import unittest

from luigi.contrib import gcs
from target_test import FileSystemTargetTestMixin

# In order to run this test, you should set these to your GCS project/bucket.
# Unfortunately there's no mock
PROJECT_ID = os.environ.get('GCS_TEST_PROJECT_ID', 'your_project_id_here')
BUCKET_NAME = os.environ.get('GCS_TEST_BUCKET', 'your_test_bucket_here')

CREDENTIALS = oauth2client.client.GoogleCredentials.get_application_default()
ATTEMPTED_BUCKET_CREATE = False


def bucket_url(suffix):
    return 'gs://{}/{}'.format(BUCKET_NAME, suffix)


class _GCSBaseTestCase(unittest.TestCase):
    def setUp(self):
        self.client = gcs.GCSClient(CREDENTIALS)

        global ATTEMPTED_BUCKET_CREATE
        if not ATTEMPTED_BUCKET_CREATE:
            try:
                self.client.client.buckets().insert(
                    project=PROJECT_ID, body={'name': PROJECT_ID}).execute()
            except googleapiclient.errors.HttpError as ex:
                if ex.resp.status != 409:  # bucket already exists
                    raise

            ATTEMPTED_BUCKET_CREATE = True

        for item in self.client.listdir(bucket_url('')):
            self.client.remove(item)


class GCSClientTest(_GCSBaseTestCase):

    def test_not_exists(self):
        self.assertFalse(self.client.exists(bucket_url('does_not_exist')))
        self.assertFalse(self.client.isdir(bucket_url('does_not_exist')))

    def test_exists(self):
        self.client.put_string('hello', bucket_url('exists_test'))
        self.assertTrue(self.client.exists(bucket_url('exists_test')))
        self.assertFalse(self.client.isdir(bucket_url('exists_test')))

    def test_mkdir(self):
        self.client.mkdir(bucket_url('exists_dir_test'))
        self.assertTrue(self.client.exists(bucket_url('exists_dir_test')))
        self.assertTrue(self.client.isdir(bucket_url('exists_dir_test')))

    def test_mkdir_by_upload(self):
        self.client.put_string('hello', bucket_url('test_dir_recursive/yep/file'))
        self.assertTrue(self.client.exists(bucket_url('test_dir_recursive')))
        self.assertTrue(self.client.isdir(bucket_url('test_dir_recursive')))

    def test_download(self):
        self.client.put_string('hello', bucket_url('test_download'))
        fp = self.client.download(bucket_url('test_download'))
        self.assertEquals(b'hello', fp.read())

    def test_rename(self):
        self.client.put_string('hello', bucket_url('test_rename_1'))
        self.client.rename(bucket_url('test_rename_1'), bucket_url('test_rename_2'))
        self.assertFalse(self.client.exists(bucket_url('test_rename_1')))
        self.assertTrue(self.client.exists(bucket_url('test_rename_2')))

    def test_rename_recursive(self):
        self.client.mkdir(bucket_url('test_rename_recursive'))
        self.client.put_string('hello', bucket_url('test_rename_recursive/1'))
        self.client.put_string('hello', bucket_url('test_rename_recursive/2'))
        self.client.rename(bucket_url('test_rename_recursive'), bucket_url('test_rename_recursive_dest'))
        self.assertFalse(self.client.exists(bucket_url('test_rename_recursive')))
        self.assertFalse(self.client.exists(bucket_url('test_rename_recursive/1')))
        self.assertTrue(self.client.exists(bucket_url('test_rename_recursive_dest')))
        self.assertTrue(self.client.exists(bucket_url('test_rename_recursive_dest/1')))

    def test_remove(self):
        self.client.put_string('hello', bucket_url('test_remove'))
        self.client.remove(bucket_url('test_remove'))
        self.assertFalse(self.client.exists(bucket_url('test_remove')))

    def test_remove_recursive(self):
        self.client.mkdir(bucket_url('test_remove_recursive'))
        self.client.put_string('hello', bucket_url('test_remove_recursive/1'))
        self.client.put_string('hello', bucket_url('test_remove_recursive/2'))
        self.client.remove(bucket_url('test_remove_recursive'))

        self.assertFalse(self.client.exists(bucket_url('test_remove_recursive')))
        self.assertFalse(self.client.exists(bucket_url('test_remove_recursive/1')))
        self.assertFalse(self.client.exists(bucket_url('test_remove_recursive/2')))

    def test_listdir(self):
        self.client.put_string('hello', bucket_url('test_listdir/1'))
        self.client.put_string('hello', bucket_url('test_listdir/2'))

        self.assertEqual([bucket_url('test_listdir/1'), bucket_url('test_listdir/2')],
                         list(self.client.listdir(bucket_url('test_listdir/'))))
        self.assertEqual([bucket_url('test_listdir/1'), bucket_url('test_listdir/2')],
                         list(self.client.listdir(bucket_url('test_listdir'))))

    def test_put_file(self):
        with tempfile.NamedTemporaryFile() as fp:
            fp.write(b'hi')
            fp.flush()

            self.client.put(fp.name, bucket_url('test_put_file'))
            self.assertTrue(self.client.exists(bucket_url('test_put_file')))
            self.assertEquals(b'hi', self.client.download(bucket_url('test_put_file')).read())


class GCSTargetTest(_GCSBaseTestCase, FileSystemTargetTestMixin):

    def create_target(self, format=None):
        return gcs.GCSTarget(bucket_url(self.id()), format=format, client=self.client)
